Redis value

Redis 除了对键的操作,当然也有不少对值的操作,通常,就比如 GETRANGE 即截取一部分变量/值信息,在 python 等语言中可以通过 [key:n] 来进行访问。

1
2
127.0.0.1:6379> GETRANGE key_name 1 4
"1234"

就如上述 code 中,1 是一个起始数,而 4 是一个结尾数,该键的值为 0123456,因此该命令可以实现出截取键值的效果。

Id Name Info Command
1 GETRANGE 截取键值并返回 GETRANGE key_name start end
2 GETSET 重新设置键值的时候返回旧的键值数据 GETSET key_name "Hello,world" (上一个 key_name 的值是 0123456,因此返回的也是他)
3 GETBIT 获取 key 所存储字符串的偏移量上的位(bit) SETBIT value offset
4 SETRANGE 通过偏移量来修改 key 中的值 SETRANGE key_name 5 " World!"
5 SETBIT 修改 key 所存储的字符串上的偏移量 (bit) SETBIT key_name 2 1
6 MGET 获取一个或多个 key MGET key_name1 key_name2
7 MSET 设置多个 key(依然会通过新值替换旧值) MSET key_one "one" key_two "two"
8 MSETNX 用于设置多个 key,遵循原子性操作(0失败,1成功) MSETNX key_one "one" key_two "two"
9 SETNX 如果 key 存在则什么都不做,不存在则插入 SETNX key_name "1" (如果 key_name 存在则返回 0,否则返回 1)
10 STRLEN 返回 key 值的字符串长度(如果存储的不是字符串类型则报错) STRLEN key_name
11 APPEND 该命令的好处就是直接在原有的键值中直接拼合 APPEND key_one "Hello" and APPEND key_one " World!" show Hello World!

获取或存储偏移量上的位

偏移量(Offset)简单概括就是存储地址和实际地址之间所存在的空段之间的距离,这通常被称之为有效地址或偏移量。

GETSET


我们以 GETSET 命令为例,首先 H 的二进制数为:01001000 ,通过 GETSET 分别输出一组后可以看到结果正是二进制数,因此本命令也就是获取 value 内的偏移量位数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
127.0.0.1:6379> GETBIT key_name 0
(integer) 0
127.0.0.1:6379> GETBIT key_name 1
(integer) 1
127.0.0.1:6379> GETBIT key_name 2
(integer) 0
127.0.0.1:6379> GETBIT key_name 3
(integer) 0
127.0.0.1:6379> GETBIT key_name 4
(integer) 1
127.0.0.1:6379> GETBIT key_name 5
(integer) 0
127.0.0.1:6379> GETBIT key_name 6
(integer) 0
127.0.0.1:6379> GETBIT key_name 7
(integer) 0

SETBIT

.SETBIT 在 redis 中,主要用于更改 key 的存储偏移量,如 SETBIT key_name 2 1 则表示将从0,左到右开始计数,替换原有的位数从而替换为 1,让其从 H 改为 h

.

1
2
3
4
127.0.0.1:6379> SETBIT key_name 2 1
(integer) 0
127.0.0.1:6379> GET key_name
"hello,world"

SETRANGE

1
2
3
4
5
6
7
127.0.0.1:6379> SET key_name "Hello"
OK
127.0.0.1:6379> SETRANGE key_name 5 " World!"
(integer) 12
127.0.0.1:6379> GET key_name
"Hello World!"
127.0.0.1:6379>

自增与自减

Id Name Info Command
1 INCR 将数值自曾(key 不存在时为 0,假设运行100次 INCR 则 数值也会被自曾到 100,每运行一次数值 +1) INCR key_one
2 INCRBY 在原有数值的基础上指定增加数值 INCRBY key_one 10(假设 key 内值为2,则通过该命令增加 10,即12)
3 DECR 在原有数值基础上来进行值减 DECR key_one (假设key内值为100,运行100次则为0)
4 DECRBY 在原有基础上自定义减值数 DECRBY key_one 9 (假设 key 值内为 10,则直接减为 9)
5 INCRBYFLOAT 为键的存储值中增加,可以执行像 1.03e+08 这样的科学计数法 INCRBYFLOAT key_name -0.01 \ INCRBYFLOAT key_name 1.03e08

INCRBYFLOAT

INCRBYFLOAT 的主要作用就是可以浮点数的增量以及支持诸如 1.03E+08 这样的科学计数法,此外无论加法计算所得到的精度实际长度有多长,该命令最终的结果只保留小数点后十七位数。

1
2
3
4
5
6
7
127.0.0.1:6379> SET key_name 1.04
OK
127.0.0.1:6379> INCRBYFLOAT key_name -0.01
"1.03"
127.0.0.1:6379> INCRBYFLOAT key_name 1.03e08
"103000001.02999999999883585"
127.0.0.1:6379>

e 是一个科学计数法符号,通常以 “E” 表示,在科学计数法中为了公式的简便,可以将 1.03 x10的次方简写为 1.03E+08 的形式

1.03 x 10的8次方,也就是 8个10相乘,之后在乘于 1.03,只保留十七位小数则应为 103000001.02999999999883585

本文使用《江雪分析公开知识存储库知识共享许可证》进行发布

Redis Keys

键(keys)通常背后都会代表着一个数据结构,就已字符串的形式来称,他会表现为 key_name=value_name 在 Redis 中的命令与常见的数据传输码类似,如 SET 是创建、GET 是请求。

1
2
3
4
5
6
7
8
9
127.0.0.1:6379> SET key_name hello,world!
OK
127.0.0.1:6379> GET key_name
"hello,world!"
127.0.0.1:6379> DEL key_name
(integer) 1
127.0.0.1:6379> GET key_name
(nil)
127.0.0.1:6379>

在上面的命令中,我们通过 SET 创建了一个 key_name,他的值是 **hello,world!**,并通过 GET 进行请求,毫无疑问返回的是对应的值。

键的操作

在这个之间还使用到了 DEL 他是删除键的意思,而对于键的操作,并不只有这一条命令:

Id Name Info Command
1 DEL 删除 Key DEL value_name
2 DUMP 序列化 key,返回所序列化的值 DUMP key_name
3 EXISTS 检查 key 是否存在 EXISTS key_name (存在返回1,不存在返回0,虽然你通过 GET 也可以媲美 EXISTS 的效果)
4 RANDOMKEY 随机从数据库中抽取一个 key RANDOMKEY
5 RENAME 修改 key 的名称 RENAME key_name new_name
6 RENAMENX 为了避免错误,虽然与 RENAME 同样是修改名称,但 RENAME 会直接覆盖(无论是否存在),而如果用此命令如果是相同的键名则会返回错误(0)创建成功会返回 1 RENAMENX key key_name
7 TYPE 返回 key 所存储的数据类型 TYPE key_name

易失 key

Id Name Info Command
1 EXPIRE 设置 key 的过期时间 EXPIRE key_name 10(```EXPIRE 的单位是秒,即10秒后过期)
2 PEXPIRE 设置 key 的过期时间(毫秒) PEXPIRE key_name 10
3 PSETEX 设置 key 的过期时间(毫秒) PTTL key_one
4 EXPIREAT 通过 UNIX 时间戳的方式为 key 设置过期时间 EXPIREAT key_name 1626521040 (2021-07-17 19:24 过期)
5 PEXPIREAT 通过 Unix 时间戳的形式来设置 key 过期时间,与 EXPIREAT 的区别是他需要精确到毫秒 PEXPIREAT key_name 1626521962 (2021-07-17 19:39:22 过期)
6 PERSIST 移除 key 的过期时间,让其成为持久 key PERSIST key_name
7 PTTL 以毫秒为单位返回 key 剩余的过期时间 PTTL key_name
8 TTL 以秒为单位返回 key 剩余的过期时间 TTL key_name
9 SETEX 同时设置过期时间为秒和键值 SETEX key_name 10 "hello"

所谓的 易失 KEY 主要就是被 Redis 命令所设置了过期时间的(如EXPIRE\EXPIRET\PEXPIRE\PEXPIREAT)所进行设置的。

这些易失的KEY只能通过使用 DEL、SET、GETSET、PERSIST 等 STORE(存储) 命令进行清楚或覆盖成为 持久 key(persistent)

只能通过覆盖或清除的方式来让其成为持久key,像诸如更改 key 名称、修改 key 值、自增的做法并不适用,过期时间依然会移植到新的 key 上。

对于通过 EXPIREAT 命令来使用 Unix 时间戳让 key 成为易失 KEY 的方式,可以通过:https://tool.chinaz.com/tools/unixtime.aspx 转换工具来完成。

于此同时需要注意的是 EXPIRESETEX 相比之下 SETEX 区别就是原子性的操作

原子性就是要么同时发生,要么就什么都不发生

关于过期时间的准确性

由于我们所使用的是 redis 2.6 因此过期时间的精度往往比 redis 2.4 提高到 0~1 毫秒之多。需要注意的是如果使用绝对 Unix 时间戳的方式进行存储,那么无论 Redis 是否运行都会流逝,但如果服务器时间并不精准,那也会导致易失 key 可靠性会降低。

过期时间的淘汰流程

Redis key 的过期时间被分为两种淘汰过程,分别为被动方式和主动,而被动方式最好理解的就是当你访问了该 key ,而这时候恰好该 key 是过期的,那么这个 key 就会被发现之后淘汰了。

当然这种方式存在缺点,假设你这个 key 永远不会被访问,那么他岂不是会存在直到你访问的那天,于是 主动过期就出来了。

主动过期的淘汰方式也就是周期性的主动随机检查一部分被设置生存时间的 key,当扫到过期时间到了的 key 将会在 key 空间中删除,Redis 每 10 秒都会执行这个操作。

本文使用《江雪分析公开知识存储库知识共享许可证》进行发布

Redis 安装与简介

Redis 是一个遵循 BSD 协议,高性能且灵活的 key-value 数据结构存储,通常可以用于作为数据库、缓存和消息队列等应用,由于遵循的是 key-value 数据结构存储,他对比其他产品的特点在于:

  1. 支持数据的持久话,可以将数据保存内存或者磁盘中,重启时可以可以加载到缓存或者内存中使用
  2. 支持简单的 key-value 类型数据的数据(同时提供了 list、set、zset、hash 结构存储)
  3. 高性能: redis 的一大特点,一个入门级的 Linux 服务器中可以每秒写入(SET)11w次,读取(GET)8.1w次,同时还支持 Pipeling 命令。
  4. 持久化:也就是说数据都存在内存中的时候,可以根据上次保存的到目前的时间来更新次数,以此他通过异步保存到磁盘上。
  5. 数据结构:Redis 所支持的数据结构有很多,除了常见的字符串、散列、集合、列表等还带有了数据集、位图、超级日志和担忧半径查询的地理空间索引等……
  6. 主/从复制:Redis 分为 client/server,主/从复制只需要一行配置文件即可达成
  7. 生态支持:Redis 支持多个语言,如 Java、JavaScript(含 node.js)、Lua、Objective-C、PHP、Perl、R、Ruby、Scale、Go、C、C++、Python 等主流语言……

安装 Redis

本文即主要介绍 Linux redis 的安装与启动,redis 的安装非常简单,需要通过下载在解压之后重新编译即可:

1
2
3
4
wget http://download.redis.io/redis-stable.tar.gz
tar -zxvf redis-6.0.8.tar.gz
cd redis-6.0.8
make

之后编译完成即可通过通过 cd src 命令进入目录,来启动 redis-server/client 客户端

进入后先启动服务端在启动客户端

./redis-server 以及 ./redis-client,当然这种启动的方式是使用默认配置文件的,如果你对服务也有调整即可使用制定的配置文件启动 ./redis-server ../redis.conf

当然既然有了服务端,那么你也可以将服务端在远程服务器启动,之后通过下述命令启动 redis-client:

1
redis-cli -h host -p port - a "password"

当上述的命令执行完后,在客户端中输入 PING 来查看是否正常启动:

1
2
127.0.0.1:6379> PING
PONG

当返回的是 PONG 的时候即代表服务正常,如果所返回的是 Could not connect to Redis at 127.0.0.1:6379: Connection refused 将会代表服务端出现了问题或断开,这时请仔细检查服务端运行情况。

配置 Redis

在默认的情况下,Redis 是没有密码的,如果需要检测目前是否设置密码,需要通过 CONFIG GET requirepass 进行检测,如果属性为空,则可以通过下述命令来设置密码:

1
2
3
4
5
127.0.0.1:6379> CONFIG SET requirepass "toor"
OK
127.0.0.1:6379> CONFIG GET requirepass
1) "requirepass"
2) "toor"

这时候我们重新启动 redis-cli 后创建一个键,会返回 (error) NOAUTH Authentication required.,这将表示我们之前所整的操作已经被应用,只需要进行身份验证即可:

1
2
127.0.0.1:6379> AUTH "toor"
OK

本文使用《江雪分析公开知识存储库知识共享许可证》进行发布

Spring cloud Gateway 路由容错


路由容错控制器的主要的运用场景就是在遇到超时、服务器端错误来发挥作用,通过 Gateway 来集成 Hystrix(含 resilience4j)通过过滤器来加入 fallbackUri 得以实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package com.example.demo.controller;

import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;

import java.util.HashMap;
import java.util.Map;

/**
* 容错控制器
*
* @author kunlun
* @date 2021/7/16
*/
@RestController
public class NotFoundController {


@RequestMapping(value = "/fallback")
public Mono<Map<String, String>> notFound() {
Map<String, String> stringMap = new HashMap<>();
stringMap.put("code", "503");
stringMap.put("data", "Current service is unavailable");
stringMap.put("gateway", "GatewayConfigurationFaultToleranceRouting");
return Mono.just(stringMap);
}
}

application.yml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
spring:
application:
name: GatewayConfigurationFaultToleranceRouting
cloud:
gateway:
routes:
- id: hey
uri: lb://service-provider/hey
predicates:
- Path=/hey
- Method=GET
filters:
- AddRequestParameter=name,key
# 服务熔断后使用的地址 @RequestMapping(value = "/fallback")
- name: CircuitBreaker
args:
name: fallback
fallbackUri: forward:/fallback
discovery:
locator:
# 允许服务发现
enabled: true
consul:
host: localhost
port: 8500
discovery:
service-name: service-provider

server:
port: 8210

需要注意的是由于我们这个是熔断,需要服务提供者的存在,因此你可以将服务提供者快速关闭来达到他短时间无法提供服务,之后网关才可认定这是由于服务出错导致的,如果直接关闭服务提供者那返回的只是 404

最后启动 consul 集群以及服务提供者,通过 GET 方式请求 http://localhost/hey 即可完成路由容错控制器的实现,当然你也可以直接通过 lb://service-provider 来进行配置你的服务路由。

本文使用《江雪分析公开知识存储库知识共享许可证》进行发布

Spring cloud GatewayFilter and GlobalFilter

过滤器之所以在网关中存在,是因为在微服务系统中,服务提供者非常的多,因此如果在每个服务中都有诸如鉴权、限流以及日志输出等则会占用服务的消耗(也就是说每个服务我都要写一遍这些功能,则会让开发很烦躁)因此可以直接通过网关来进行处理。

网关应可以处理鉴权、限流等工作

PRE and POST

PRE

根据请求的生命周期,在 Gateway 中,过滤器会分为 PRE、POST 两种,其中 PRE 代表 在路由执行之前的请求 执行该过滤,因此主要应用在参数校验、鉴权、流量监控、日志输出、协议转换功能。

POST

POST 主要是在 请求被路由跳到微服务之后所执行 的过滤器,因此这种过滤器可以实现出相应头(HTTP Header)以及收集统计信息和指标、响应转发、日志输出、流量监控等功能。

GatewayFilter and GlobalFilter

GatewayFilter (网关过滤器)

过滤器分为两类,其中网关过滤器(GatewayFilter)只会在 单个路由或者分组路由中。网关过滤器是一种 可以修改请求传入的 HTTP 请求或输出 HTTP 相应 的特定路由使用,Gateway 内置了 20 多种网关过滤器工厂来编写网关过滤器:

Id Name type Info
1 AddRequestHeader Request 用于在请求头中添加自定义的键值对
2 AddRequestParameter Request 用于在请求参数中添加请求参数的键值对
3 AddResponseHeader Response 用于在相应头中添加
4 Hystrix 断路器 用于将断路器引入网关路由中,可以让服务免受级联故障影响,并在故障时提供回退响应(使用时需要使用名为 HystrixCommand 的参数)
5 PrefixPath Path 用于使用简单的 Prefix
6 PreserveHostHeader Header 设置路由过滤器的请求属性,检查是否发送原主机头或由 HTTP 客户端确定主机头
7 RequestRateLimiter Request 用于确定当前请求是否继续(如果不允许则会返回 HTTP 429 - Too Many Request
8 RedirectTo Request 用于接受请求的状态和 URL 参数(该状态是一个重定向的 300 系列 HTTP 代码,类似与 301。但 URL 是 Location 头部的值)
9 RemoveNonProxyHeaders Headers 从转发的请求中删除请求头
10 RemoveRequestHeader Request 通过请求头名删除请求头
11 RemoveResponseHeader Response 通过响应头名删除响应头
12 RewritePath Path 通过 Java 正则表达式重写请求路径
13 SaveSession Session 在转发下游调用之前,强制执行保存 Session 的操作
14 SecureHeaders Headers 为响应头添加安全头
15 SetPath Path 该方法允许通过路径的模板段来操作请求路径,使用 Spring 的 URI 模板,支持多种匹配
16 SetResponseHeader Response 需要通过 Key-Value 对来设置响应头
17 SetStatus Response 用于设置请求响应状态,但需要 Status 参数(需要有效的 Spring HttpStatus,例如整形的 404 或枚举类型字符串 NOT_FOUND
18 StripPrefix Request 用于剥离前缀(需要 parts 参数,用于表明请求被发送到下游之前从请求路径中所剥离的元素数量)
19 Retry Request 主要用于重试,但需要 Retries、Statuses、Methods、Series 等参数
Retries:重试的次数
Statuses:重试的 HTTP 状态码(通过 org.springframework.http.HttpStatus 表示)
Methods:重试的 HTTP 状态码(通过 org.springframework.http.HttpMethod 表示)
Series:重试的状态码(通过 org.springframework.http.HttpStatus.Series 表示)
20 RequestSize Request 用于限制请求大小,当请求超过限制时启用,限制请求达到下游服务,该过滤器将 RequestSize 作为参数
AddRequestHeader(在请求头中添加自定义的键值对)

AddRequestHeader 即主要在请求头中来添加自定义的键值对,其主要将应用过滤器 AddRequestHeader=X-Request-Name,X-Request-Value 应用并添加至请求头中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
spring:
application:
name: Gateway
cloud:
gateway:
routes:
- id: /guonei
uri: http://news.baidu.com
predicates:
- Method=GET
filters:
- AddRequestHeader=X-Request-Name,X-Request-Value

server:
port: 8210

logging:
level:
ROOT: DEBUG

.AddRequestHeader 会将 X-Request-Name=X-Request-Value 标头添加到所有请求头中。

AddRequestParameter (添加请求参数)

AddRequestParameter 主要的作用就是在请求头(Header)中添加请求参数的键值对,即将 RouteDefinition /guonei 过滤器 _genkey_0=KEY_NAME, _genkey_1=KEY_VALUE 应用在 AddRequestParameter (添加请求参数)中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
spring:
application:
name: Gateway
cloud:
gateway:
routes:
- id: /guonei
uri: http://news.baidu.com
predicates:
- Method=GET
filters:
- AddRequestParameter=KEY_NAME,KEY_VALUE

server:
port: 8210

logging:
level:
ROOT: DEBUG

这会将 KEY_NAME=KEY_VALUE 添加到所有匹配请求的下游请求所查询的字符串中,AddRequestParameter 即添加请求参数会用于匹配路径或主机的 URI 变量。

AddResponseHeader (添加响应头)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
spring:
application:
name: Gateway
cloud:
gateway:
routes:
- id: /guonei
uri: http://news.baidu.com
predicates:
- Method=GET
filters:
- AddResponseHeader=X-Response-Name,X-Response-Value

server:
port: 8210

logging:
level:
ROOT: DEBUG

此刻 AddResponseHeader 会将 X-Response-Name=X-Response-Value 所添加至响应头中。

GlobalFilter (全局过滤器)

另一种则为全局过滤器(GlobalFilter)这种过滤器与网关过滤器的区别是会 应用到所有路由中,在 Gateway 内置中,有九种全局过滤器:

Id Name Info
1 Forward Routing Filter 在 exchange 的 ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR 属性值中查找 URI(如果 URI 是 forward 协议,则将它用于 DispatcherHandler (调度程序处理程序) 处理请求)
2 LoadBalancerClient Filter 在 exchange 的 ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR 值中查找 URI,如果 URI 是 lb 协议,则会使用 Spring cloud LoadBalancerClient 将名称(lb://myservice 中的 myservice)解析为实际的主机和端口,并替换 URI 中相同属性。(除此过滤器还会查看 ServerWebExchangeU体力是。GATEWAY_SCHEME_PREFIX_ATTR 属性来判断他是否等于 lb
3 Netty Routing Filter 假设 ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR 中的 URI 使用的是 HTTP 或 HTTPS 协议,则运行 Netty Routing Filter。他用 Netty HttpClient 所发出的下游代理请求。相应应该放在 ServerWebExchangeUtils.CLIENT_RESPONSE_ATTR 的 exchange 中以便在以后的过滤器中使用
4 Netty Write Response Filter ServerWebExchangeUtils.CLIENT_RESPONSE_ATTR 值中存在 NettyHttpClientResponse 则会运行 Netty Write Response Filter。(他在所有其他过滤器完成后运行,并将代理相应写回到网关客户端的响应数据中)
5 RouteToRequestUriFilter 如果 ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR 值中存在 Route 对象,则会将路由到请求地址,他会根据请求 URI 创建一个新的 URI。新的 URI 则会位于 ServerWebExchangeUtils.GATEWAY_REQUEST_URI_ATTR 值中(假设 URI 具有协议前缀,如 lb:ws://serviceid 则会将从 URI 中所剥离 lb 协议,并防止在 ServerWebExchangeUtils.GATEWAY_SCHEMEPREFIX_ATTR 中以便稍后在过滤器中所使用)
6 Websocket Routing Filter 假设 ServerWebExhcangeUtils.GATEWAY_REQUEST_URL_ATTR 值中的 URI 是 ws 或 wss 协议,则运行 Websocket Routing Filter (利用 Spring Web Socket 底层代码将 WebSocket 请求转发到下游)
也可以通过 URI 前面添加 lb 来对 Websocket 来进行负载均衡,如 lb:ws://serviceid(需要注意的是如果在普通 HTTP 上使用 SockJS 来作为回退,则会匹配正常的 HTTP 路由以及 Websocket 路由)
7 Gateway Metrics Filter 使用需要添加 spring-boot-starter-actuator 依赖,默认会通过 spring.cloud.gateway.mertics.enable 设置为 false Gateway Metrics Filter 才会运行。该过滤器会添加一个名为 gateway.request 指标(Metrics),并包含四个属性:
routeId:路由器 ID
routeUri:API将会被路由转到 URI
outcome:由 HttpStatus.Series 所分类的结果
status:HTTP 请求返回给客户端的状态
8 Combined Global Filter and GatewayFilter Ordering(组合是全局过滤器和网管过滤器排序) 当请求进入并匹配到一个路由时, Filtering Web Handler 会将 GlobalFilter 的所有实例和 GatewayFilter 的所有路由特定实例添加到过滤器链中。(这个组合的过滤器链由 org.springframeworking.core.Ordered 接口的排序,并通过 gegOrder() 方法或注解 @Order 来设置)同时网关也会对过滤器逻辑执行的 “PRE” 和 ”POST“ 阶段来进行区分
9 Marking An Exchange As Routed (路由交换) 网关在路由 WEB 服务交换后(ServerWebExchange),会将 Gateway Already Routed 添加到 exchange 属性来将该交换标识为 ”路由”(一旦被请求标识为路由,则其他的路由过滤器会跳过该请求,也可以通过便携方法将交换标识为路由,或检查交换是否已经成为路由)

路由容错

路由的容错主要分为两种,分别为需要处理为定义路由、之后通过 Hystrix 来处理路由的熔断,这也是路由容错最为核心的一点

处理未定义路由

处理未定义路由所代表的就是当路由并不存在时,所返回的错误信息,他的目的就是所提供的报错信息来对用户或开发人员给出有效的反馈。

本文使用《江雪分析公开知识存储库知识共享许可证》进行发布

Spring cloud Gateway 路由谓词工厂


Gateway 是 Spring cloud 官方所推出的第二代网关(API Gateway)框架,在为服务系统中,网关的主要作用就是当用户调用的所有微服务,都要经过网关。

在为服务中,除于不同微服务可能会带来不同的开发语言和协议,因此需要通过网关来处理服务的调用

网关统一向外部系统提供 REST API,在 Spring cloud 中,使用 Zuul、Gateway 等作为网关可以实现出动态路由、监控、回退、安全功能等。

Spring cloud Gateway


Gateway 是 Spring cloud 官方所推出的第二代网关(API Gateway)框架,在为服务系统中,网关的主要作用就是当用户调用的所有微服务,都要经过网关。

在为服务中,除于不同微服务可能会带来不同的开发语言和协议,因此需要通过网关来处理服务的调用

网关统一向外部系统提供 REST API,在 Spring cloud 中,使用 Zuul、Gateway 等作为网关可以实现出动态路由、监控、回退、安全功能等。

特点

Gateway 基于 Spring Framework 5、Project Reactor 、Spring boot 2.0 构建,可以匹配任何请求属性的路由,能编写谓词(Predicates)以及过滤器(Filters)等。

谓词主要是用于确定给定输入的 true 或 false,在网关中谓词和过滤器也可以用于特定的路由。

过滤器可以重写数据,但由于 Gateway 是异步的,如果需要对响应的 body 进行修改需要使用 writeWith() 所提供的 GlobalFilter、GatewayFilter 类型。

其中 GlobalFilter 是对所有路由有效的,而 GatewayFilter 类型仅对指定范围生效

除此之外 Gateway 还支持路径重写、动态路由和集成了 Hystrix 断路器以及 DiscoveryClient 的集成和限流。

工作流程

首先 HTTP/HTTPS 强求 Spring cloud Gateway,DispatcherHandler 接受请求,并通过 RoutePredicateHandlerMapping 进行路由匹配,如果网关路由与请求的路由进行匹配 则将请求发送到 FilteringWebHandler,如果不匹配,那么则将请求发送给 DispatcherHandler 进行处理。

最后 FilteringWebHandler 通过自定义的过滤器来发送请求,这会将请求转发到具体的服务中,最后处理结果给用户。

谓词接口和谓词工厂

谓词(Predicate),在 Java 8 中引入的一个函数式接口,主要用于接受输入参数并返回布尔值的结果,Spring cloud Gateway 通过 Predicate 接口来判断当前路由是否满足指定条件。

谓词工厂(Route Predicate Factories)主要作用就是当符合谓词条件就使用该路由进行匹配,否则就忽略。

Spring cloud Gateway 的路由规则是由 RouteDefinitionLocator 类进行管理,默认情况下使用 Spring Boot 的 @ConfigurationProperties 机制来加载属性。

实现

Java API


可以通过最简单的 Java API 的方式来构建路由,主要通过 @Bean 来实现一个自定义的 RouteLocator 类来实现 自定义路由转发规则。在此之前,需要通过添加 gateway 依赖来完成:

1
2
3
4
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-gateway</artifactId>
</dependency>

由于 Spring cloud gateway 使用的是 Netty+WebFlux 实现的,因此不需要引入 Web 模块依赖。

之后在启动类添加自定义 RouteLocator 类即可:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package com.example.demo;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.gateway.route.RouteLocator;
import org.springframework.cloud.gateway.route.builder.RouteLocatorBuilder;
import org.springframework.context.annotation.Bean;

/**
* 将请求转发给 baidu.com
*
* @author kunlun
* @date 2021/7/8
*/
@SpringBootApplication
public class DemoApplication {

public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}

@Bean
public RouteLocator routeLocator (RouteLocatorBuilder builder) {
return builder.routes().
route("guonei", r -> r.path("/guonei/**")
.uri("http://news.baidu.com/")).build();
}

}

在上述的过程中,route 主要将与 guonei 路由想匹配的通过 Spring cloud Gateway 转发到了 http://news.baidu.com/guonei 中,因此访问 http://localhost:8081/guonei 就是访问 http://news.baidu.com/guonei

其中 r.path 参数下的 /guonei/** 指的是这将会匹配包含所有任何后缀携带 /guonei 的路径模式,还需要注意的是 Spring cloud Gateway 支持两种的配置,上面所实现的是基于RouteLocator 的实现,还有 properties 配置文件的实现

application.yml / application.proerties

1
2
3
4
5
6
7
8
9
10
11
12
13
spring:
cloud:
gateway:
routes:
- id: guonei
uri: http://news.baidu.com/
predicates:
- Path=/guonei
application:
name: GatewayForward

server:
port: 8081

与 Java API 一样,访问 http://localhost:8081/guonei 就是访问 http://news.baidu.com/guonei,同样的实现了路由的功能。

路由规则(RoutePredicateFactory)

在 Gateway 中,主要通过 RoutePredicateFactory(谓词工厂)来创建谓词(Predicate),因此也提供了很多种谓词工厂,这也是在上面所提到的:“谓词工厂(Route Predicate Factories)主要作用就是当符合谓词条件就使用该路由进行匹配,否则就忽略。”

通过谓词工厂,使得开发人员可以简单配置即可获得很多需要和想要的路由规则,及谓词(Predicate),这些路由规则会根据 HTTP 请求进行不同属性来匹配,其中最为主要的为下述几类:

RoutePredicateFactory type info
AfterRoute……(省略 PredicateFactory) datetime 请求时间满足在配置时间之后
BeforeRoute datetime 请求时间满足在配置时间之前
BetweenRoute datetime 请求时间满足在配置时间之前
CookieRoute Cookie 请求指定了 Cookie 正则匹配的指定值
HeaderRoute Header(数据头) 请求指定了请求上下文所匹配的指定值
CloudFoundryRouteServiceRoute Header 请求 Headers 是否包含了指定的名称
MethodRoute Method(方法) 请求的方法是否匹配配置的方法
PathRoute Path(路径) 请求路径是否匹配指定值
Query Queryparam(查询参数) 请求查询的查询参数与配置文件的相匹配
RemoteAddreRoute Remoteaddr(远程地址) 远程地址是否匹配指定值
HostRoute Host(主机) 请求主机是否匹配指定值

After

AfterRoutePredicateFactory(After路由谓词工厂)是一个 datetime(日期时间) 类型的为此,他主要表达的是如果请求在配置文件中 时间在配置文件之后 发生的请求允许。

1
2
3
4
5
6
7
8
9
10
11
12
13
spring:
cloud:
gateway:
routes:
- id: after
uri: http://news.baidu.com/
predicates:
- After=2022-01-20T18:06:06+08:00[Asia/Shanghai]
application:
name: GatewayForward

server:
port: 8081

After 的核心意思就是只能在配置规定后的时间进行访问,但只能通过 UTC + 时区的方式来添加规则,否则将直接报错。After 的主要作用就是在 2021 年 1 月 20 日之后才可进行访问,否则将会直接报错,直接访问 http://localhost:8081/ 即可。

Before

Before 也是属于 datetime 类型之一,其核心含义是当请求满足配置时间之前,而之前的 After 路由谓词工厂只是满足配置时间之后:

1
2
3
4
5
6
7
8
9
10
11
12
13
spring:
cloud:
gateway:
routes:
- id: after
uri: http://news.baidu.com/
predicates:
- Before=2022-01-20T18:06:06+08:00[Asia/Shanghai]
application:
name: GatewayForward

server:
port: 8081

Between

Between 可以理解为是两个 UTC 时间之间的请求,因此他有两个参数,分别是 datetime1 以及 datetime2 这是一个 ZonedDateTime 对线,因此请求需要满足 datetione1 之后且 datetime2 之前的请求:

1
2
3
4
5
6
7
8
9
10
11
spring:
cloud:
gateway:
routes:
- id: after
uri: http://news.baidu.com/
predicates:
- Between=2019-01-20T18:06:06+08:00[Asia/Shanghai],2022-01-20T18:06:06+08:00[Asia/Shanghai]
application:
name: GatewayForward

如上面的 Between 路由规则是,在 2019-01-20T18:06:06+08:00[Asia/Shanghai] 之后,2022-01-20T18:06:06+08:00[Asia/Shanghai] 之前的请求将会进行匹配,否则将会忽视。

CookieRoutePredicateFactory 主要用于匹配指定值,虽然很多作者说直接可以一条参数走到低:mycookie,mycookievalue,但很遗憾并没有作用,因此只能通过全展开的方式来实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
spring:
cloud:
gateway:
routes:
- id: after
uri: http://news.baidu.com/
predicates:
- name: Cookie
args:
name: meteor_login_token
regexp: ipQdtc89kEO7-JaBWLdgwPHzX9oDy6l-2qXmZ_OeHye
application:
name: GatewayForward

server:
port: 8081

在上述的 cookie 中,主要指定了 cookie 名字 meteor_login_token,以及 cookie_value ipQdtc89kEO7-JaBWLdgwPHzX9oDy6l-2qXmZ_OeHye,因此如果请求的 cookie 与配置文件相匹配则通过,否则将会忽略。

Header 即请求上下文的头部信息,当请求与配置的 Header 一致时进行转发,在官方文档中被称之为:X-Request-Id 与正则表达式合在一起进行匹配,但并没有什么作用。因此我们可以理解为 Header 请求头是根据上下文头部信息来指定服务器的域名和服务器正在侦听的 TCP 端口号:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
spring:
cloud:
gateway:
routes:
- id: after
uri: http://news.baidu.com/
predicates:
- Header=Host,localhost:8081

application:
name: GatewayForward

server:
port: 8081

当请求的上下文头部信息与配置文件相互匹配则允许请求,否则将会直接进行忽略。

Host

Host 路由谓词工厂与 Header 类似,或者说两者都可以实现其效果,但 Host 路由谓词工厂很明显比 Hedaer 更加的简单且好理解。因此他主要的作用就是匹配所请求的主机是否和配置文件相互一致,否则将不具备转发功能:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
spring:
cloud:
gateway:
routes:
- id: after
uri: http://news.baidu.com/
predicates:
- Host=localhost:8081

application:
name: GatewayForward

server:
port: 8081

在上述配置文件中,请求的主机是 localhost:8081 因此与配置文件的信息相互匹配,可以通过网关使用路由进行跳转。

Method


Method 路由谓词工厂相比上述几个 header 类型的谓词工厂显得特别实用,他主要用指定访问资源以响应预检请求时允许的方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
spring:
cloud:
gateway:
routes:
- id: after
uri: http://news.baidu.com/
predicates:
- Method=GET,POST

application:
name: GatewayForward

server:
port: 8081

就比如上述的配置文件,我们允许了 GET,POST 方法请求,但不允许 PUT 方法请求,因此使用该方法请求将会直接被网关所进行拦截。

Query


Query 路由谓词工厂主要是当 请求查询的查询参数与配置文件的相匹配,就比如 localhost:8081?name=111 正好与配置文件中的 query=namename 相互匹配,则允许请求:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
spring:
cloud:
gateway:
routes:
- id: after
uri: http://news.baidu.com/
predicates:
- Query=name

application:
name: GatewayForward

server:
port: 8081

RemoteAddre

RemoteAddre 与 Header 路由谓词工厂类似,简单理解就是他们的进阶版,他可以根据 CIDR 表示法进行匹配,也就是说当你写入规则是 192.168.0.1/24 时,你 192.168.0.104/24 也可以请求此接口,但如果你不是这个网段下的,则会被拒绝

1
2
3
4
5
6
7
8
9
10
11
12
13
14
spring:
cloud:
gateway:
routes:
- id: remoteaddr_route
uri: http://news.baidu.com
predicates:
- RemoteAddr=192.168.0.1/24

application:
name: GatewayForward

server:
port: 8081

localhost 是 127.0.0.1 的地址映射,因此在 RemoteAddr 路由谓词工厂规则中他会被直接拒绝。

Weight


Weight 是权重路由谓词工厂,当 Gateway 中有多个路由网关配置时可以发挥作用,他主要分为 组(group) 以及 权重(weight) 进行组合:

1
2
3
4
5
6
7
8
9
10
11
12
spring:
cloud:
gateway:
routes:
- id: weight_one
uri: http://news.baidu.com
predicates:
- Weight=group1, 1
- id: weight_two
uri: http://baidu.com
predicates:
- Weight=group1, 8

就比如上述的配置中,weight_two 的权重比 weight_one 高,因此网关匹配的是 weight_two 的路由。

Gateway consul 服务转发


在正常的微服务架构中,都会选择依赖服务中心来进行注册,这通常需要对每个服务提供者进行单独的配置,Gateway 的出现爱你提供了默认转发的工作,当网关注册到服务中心后,网关则会代替服务中心来提供转发的服务

1
2
3
4
5
6
7
8
9
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-consul-discovery</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-gateway</artifactId>
</dependency>

application.yml

application.yml 配置文件中,需要开启服务转发的选项,即 spring.cloud.gateway.discovery.locator.enabled 即可,之后在启动类中添加 @EnableDiscoveryClient 注解以此来实现服务发现:

1
2
3
4
5
6
7
8
9
10
11
12
13
spring:
application:
name: Gateway Consul
cloud:
consul:
host: localhost
port: 8500
gateway:
discovery:
locator:
enabled: true
server:
port: 8515

本文使用《江雪分析公开知识存储库知识共享许可证》进行发布


layout: spring-cloud
title: Ribbon 九大负载均衡策略
date: 2021-06-28 04:15:52
tags:


AbstractLoadBalancerRule 是一个 IRule 的继承,他本身则是一个负载均衡策略的抽象类,而 IRule 主要定义了 ILoadBalancer ,其最为核心的方法为 choose(),这是用于选择服务器对象(服务实例),如过没有将会返回 null。

而定义 IRule 主要的目的第一就是为了辅助负载均衡器(ILoadBalancer来通过负载均衡策略选择合适的服务实例,其默认使用的是 九大负载均衡 策略中的 线性轮询策略(RoundRobinRule)

线性轮询策略(Round Robin Rule)

由于 Round Robin Rule 是 ILoadBalancer 默认采用的负载均衡策略,因此也和他的流程非常符合,与此同时他还是作为著名的负载均衡策略,他主要定义了 AVAILABLE_ONLY_SERVERS(仅可用服务器)ALL_SERVERS(所有服务器) 两个状态。

  1. choose(ILoadBalancer lb, Object key) 方法从负载均衡中选择一个服务器并计数,如果没有则会返回“no load balancer”
1
2
3
4
5
public Server choose(ILoadBalancer lb, Object key) {
if (lb == null) {
log.warn("no load balancer");
return null;
}
  1. incrementAndGetModulo 方法会通过serverCount 函数来请求 allServers(所有服务器),而 incrementAndGetModulo 方法则会获取下一个索引,也就是说 获取所有服务器,并从 0 的基础上 +1
1
2
3
4
5
6
7
8
private int incrementAndGetModulo(int modulo) {
for (;;) {
int current = nextServerCyclicCounter.get();
int next = (current + 1) % modulo;
if (nextServerCyclicCounter.compareAndSet(current, next))
return next;
}
}
  1. 最后通过 choose(Object key) 通过索引去服务列表获取服务,如果连续 10次没有获取到服务,则会返回: “No available alive servers after 10 tries from load balancer”。假设 可达服务器(reachableServers)服务器总数(allServers) 为 0,则会输出: “No up servers available from load balancer” 错误。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
    Server server = null;
int count = 0;
while (server == null && count++ < 10) {
List<Server> reachableServers = lb.getReachableServers();
List<Server> allServers = lb.getAllServers();
int upCount = reachableServers.size();
int serverCount = allServers.size();

if ((upCount == 0) || (serverCount == 0)) {
log.warn("No up servers available from load balancer: " + lb);
return null;
}

int nextServerIndex = incrementAndGetModulo(serverCount);
server = allServers.get(nextServerIndex);

if (server == null) {
/* Transient. */
Thread.yield();
continue;
}

if (server.isAlive() && (server.isReadyToServe())) {
return (server);
}

// Next.
server = null;
}

if (count >= 10) {
log.warn("No available alive servers after 10 tries from load balancer: "
+ lb);
}
return server;
}

重试策略(Retry Rule)

重试策略(Retry Rule)是 Rule 的级联,他采用 Round Robin Rule的 choose()方法来获取服务器实例,最大重试次数(maxRetryMillis)500,因此他在选择实例和重试方法是:

级联(cascade)在计算机科学中指多个对象之间一对多的映射关系,可以理解为一张表 A 用于存放学生所在班级(姓名、性别、年龄)而姓名作为主键,而另一张表 B 存放着楼层住户信息(姓名、性别)他们之间通过 姓名、年龄 来作为级联

  1. 如果通过 choose() 方法获取服务器实例正常,则回答并返回数据

  2. 假设超过了 最大的重试次数(maxRetryMillis) 没有获取到 “活着的服务器”,则返回 null

    1
    2
    3
    4
    5
    if ((answer == null) || (!answer.isAlive())) {
    return null;
    } else {
    return answer;
    }
  3. 最后如果没有活着的服务回答在当前时间小于 500 ms 的情况下,则会不断的在这时间段重试。

1
2
3
4
5
6
7
if (((answer == null) || (!answer.isAlive()))
&& (System.currentTimeMillis() < deadline)) {
/* pause and retry hoping it's transient */
Thread.yield();
} else {
break;
}

加权响应时间策略(WeightedResponseTimeRule)

WeightedResponseTimeRule 是 RoundRobinRule 的延伸,对一些功能进行了扩展、可以根据服务实例的运行情况计算出服务实例的权重,之后进行服务实例的挑选。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
public Server choose(ILoadBalancer lb, Object key) {
if (lb == null) {
return null;
}
Server server = null;

while (server == null) {
// get hold of the current reference in case it is changed from the other thread
List<Double> currentWeights = accumulatedWeights;
if (Thread.interrupted()) {
return null;
}
List<Server> allList = lb.getAllServers();

int serverCount = allList.size();

if (serverCount == 0) {
return null;
}

int serverIndex = 0;

// last one in the list is the sum of all weights
double maxTotalWeight = currentWeights.size() == 0 ? 0 : currentWeights.get(currentWeights.size() - 1);
// No server has been hit yet and total weight is not initialized
// fallback to use round robin
if (maxTotalWeight < 0.001d) {
server = super.choose(getLoadBalancer(), key);
if(server == null) {
return server;
}
} else {
// generate a random weight between 0 (inclusive) to maxTotalWeight (exclusive)
double randomWeight = random.nextDouble() * maxTotalWeight;
// pick the server index based on the randomIndex
int n = 0;
for (Double d : currentWeights) {
if (d >= randomWeight) {
serverIndex = n;
break;
} else {
n++;
}
}

server = allList.get(serverIndex);
}

if (server == null) {
/* Transient. */
Thread.yield();
continue;
}

if (server.isAlive()) {
return (server);
}

// Next.
server = null;
}
return server;
}
  1. choose 方法获取服务实例时,如果服务负载均衡器和所有访问器为空,则返回 null
  2. 之后当前所有服务的索引的最后一位,则是所有权重的总和,之后生成 0~总和 的随机权重
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
        try {
logger.info("Weight adjusting job started");
AbstractLoadBalancer nlb = (AbstractLoadBalancer) lb;
LoadBalancerStats stats = nlb.getLoadBalancerStats();
if (stats == null) {
// no statistics, nothing to do
return;
}
double totalResponseTime = 0;
// find maximal 95% response time
for (Server server : nlb.getAllServers()) {
// this will automatically load the stats if not in cache
ServerStats ss = stats.getSingleServerStat(server);
totalResponseTime += ss.getResponseTimeAvg();
}
// weight for each server is (sum of responseTime of all servers - responseTime)
// so that the longer the response time, the less the weight and the less likely to be chosen
Double weightSoFar = 0.0;

// create new list and hot swap the reference
List<Double> finalWeights = new ArrayList<Double>();
for (Server server : nlb.getAllServers()) {
ServerStats ss = stats.getSingleServerStat(server);
double weight = totalResponseTime - ss.getResponseTimeAvg();
weightSoFar += weight;
finalWeights.add(weightSoFar);
}
setWeights(finalWeights);
} catch (Exception e) {
logger.error("Error calculating server weights", e);
} finally {
serverWeightAssignmentInProgress.set(false);
}

}
}

在这其中作为最终要的角色则是 DynamicServerWeightTask 方法,主要通过 totalResponseTime(总响应时间) 来计算权重,其原理就是使用 ServerStats 在负载均衡器中捕获每个服务器的各种统计信息,那么总响应时间就是通过他的 getResponseTimeAvg 方法获取处理请求的平均总时间,以毫秒为单位,这个过程主要找到最大 95% 的响应时间

这样每个服务器的权重就是 所有服务器响应时间的总和减去响应时间 (totalResponseTime - getResponseTimeAvg) 这样得出的结果就是 响应时间越长则权重越小,则选中的几率就很小

随机策略(RandomRule)

RandomRule 主要是一个在现有服务器之间随机分配流量的负载均衡,他主要的步骤是:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public Server choose(ILoadBalancer lb, Object key) {
if (lb == null) {
return null;
}
Server server = null;

while (server == null) {
if (Thread.interrupted()) {
return null;
}
List<Server> upList = lb.getReachableServers();
List<Server> allList = lb.getAllServers();

int serverCount = allList.size();
if (serverCount == 0) {
/*
* No servers. End regardless of pass, because subsequent passes
* only get more restrictive.
*/
return null;
}

int index = rand.nextInt(serverCount);
server = upList.get(index);

if (server == null) {
/*
* The only time this should happen is if the server list were
* somehow trimmed. This is a transient condition. Retry after
* yielding.
*/
Thread.yield();
continue;
}

if (server.isAlive()) {
return (server);
}

// Shouldn't actually happen.. but must be transient or a bug.
server = null;
Thread.yield();
}

return server;

}
  1. 通过choose 方法获取到服务器实例,如果负载均衡器和服务器为0则返回 null
  2. 首先他会通过 upList 以及 allList() 分别获取存活服务器列表和所有服务器列表,之后通过 Random 来生成一个随机数生成器
  3. 如果服务器正常运行,则返回该服务,并对可以请求的服务器标注一个随机的值,这就让每次通过 choose 方法获取到的服务器实例都会有一个随机的标注。

客户端配置启用线性轮询策略(ClientConfigEnabledRoundRobinRule)

客户端配置启用线性轮询策略,从名字上可以看出他就是为了启用 RoundRobinRule 策略的,因此他的整个流程都是通过 RoundRobinRule 来使用负载均衡器。而 choose 方法则也是通过 RoundRobinRule 来实现的,如果没有使用他则会出现 :“This class has not been initialized with the RoundRobinRule class” 的报错。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class ClientConfigEnabledRoundRobinRule extends AbstractLoadBalancerRule {

RoundRobinRule roundRobinRule = new RoundRobinRule();

@Override
public void initWithNiwsConfig(IClientConfig clientConfig) {
roundRobinRule = new RoundRobinRule();
}

@Override
public void setLoadBalancer(ILoadBalancer lb) {
super.setLoadBalancer(lb);
roundRobinRule.setLoadBalancer(lb);
}

@Override
public Server choose(Object key) {
if (roundRobinRule != null) {
return roundRobinRule.choose(key);
} else {
throw new IllegalArgumentException(
"This class has not been initialized with the RoundRobinRule class");
}
}

}

最大空闲策略(BestAvailableRule)

该规则主要用于选择并发请求最少的服务器实例来提供服务,延伸自 ClientConfigEnabledRoundRobinRule,通过 LoadBalancerStats 来统计每个服务器的特征和信息,以此来过滤失败的服务实例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public Server choose(Object key) {
if (loadBalancerStats == null) {
return super.choose(key);
}
List<Server> serverList = getLoadBalancer().getAllServers();
int minimalConcurrentConnections = Integer.MAX_VALUE;
long currentTime = System.currentTimeMillis();
Server chosen = null;
for (Server server: serverList) {
ServerStats serverStats = loadBalancerStats.getSingleServerStat(server);
if (!serverStats.isCircuitBreakerTripped(currentTime)) {
int concurrentConnections = serverStats.getActiveRequestsCount(currentTime);
if (concurrentConnections < minimalConcurrentConnections) {
minimalConcurrentConnections = concurrentConnections;
chosen = server;
}
}
}
if (chosen == null) {
return super.choose(key);
} else {
return chosen;
}
}

如果 loadBalancerStats 不为 null,则找出最小的并发连接数(minimalConcurrentConnections)来使用。假设 loadBalancerStats 为 null,则通过 ClientConfigEnabledRoundRobinRule 类中的 choose 类来使用线性轮询策略。

过滤线性轮询策略(PredicateBasedRule)

过滤线性轮询策略主要在过滤给定的服务器列表和负载均衡器后,通过内部定义的一个过滤器来筛选出服务实例清单,之后通过线性轮询方式过滤服务实例,并从清单选取一个服务实例。

.AbstractServerPredicate 是服务器过滤逻辑的基本构建块,可用于规则和服务器列表过滤器。 谓词的输入对象是PredicateKey,里面有Server和负载均衡器的key信息。

1
2
3
4
5
6
7
8
9
public Server choose(Object key) {
ILoadBalancer lb = getLoadBalancer();
Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key);
if (server.isPresent()) {
return server.get();
} else {
return null;
}
}
AbstractServerPredicate

过滤线性轮询的主要功能主要通过 AbstractServerPredicate 进行实现,而 AbstractServerPredicatePredicate 的实现。

.Predicate 主要是用于确定给定输入的 true 或 false,因此也被称之为 “谓词”。

在数学逻辑中,谓词通常使用大写罗马字母表示,如P、Q、R这些,根据其变量值来可能为 “真假(true or false)” 的陈述。

也可以理解为一个运算符或函数(布尔值函数),根据输入来返回一个 false or true。

  1. 在初始阶段,他主要会通过 LoadBalancerStats 来获取负载均衡器的统计信息
    1. 如果为空则返回负载均衡器的信息。
    2. 不为空的话获取负载均衡器(ILoadBalancer),在获取负载均衡器的统计信息,并返回设置的负载均衡器统计信息(setLoadBalancerStats)。
      1. 如果未找到 LoadalancerStats 未找到则返回 null
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
protected LoadBalancerStats getLBStats() {
if (lbStats != null) {
return lbStats;
} else if (rule != null) {
ILoadBalancer lb = rule.getLoadBalancer();
if (lb instanceof AbstractLoadBalancer) {
LoadBalancerStats stats = ((AbstractLoadBalancer) lb).getLoadBalancerStats();
setLoadBalancerStats(stats);
return stats;
} else {
return null;
}
} else {
return null;
}
}
  1. 之后通过 getServerOnlyPredicate 来获取布尔值函数(即谓词),并通过 getEligibleServers 筛选出合格的服务器。最后使用 chooseRandomlyAfterFiltering 方法筛选并随机选择服务器实例,最后也可以通过 chooseRoundRobinAfterFiltering 方法来创建循环选择过筛选后的服务器实例。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
public Optional<Server> chooseRandomlyAfterFiltering(List<Server> servers) {
List<Server> eligible = getEligibleServers(servers);
if (eligible.size() == 0) {
return Optional.absent();
}
return Optional.of(eligible.get(random.nextInt(eligible.size())));
}

/**
* Choose a server in a round robin fashion after the predicate filters a list of servers. Load balancer key
* is presumed to be null.
*/
public Optional<Server> chooseRoundRobinAfterFiltering(List<Server> servers) {
List<Server> eligible = getEligibleServers(servers);
if (eligible.size() == 0) {
return Optional.absent();
}
return Optional.of(eligible.get(nextIndex.getAndIncrement() % eligible.size()));
}

/**
* Choose a random server after the predicate filters list of servers given list of servers and
* load balancer key.
*
*/
public Optional<Server> chooseRandomlyAfterFiltering(List<Server> servers, Object loadBalancerKey) {
List<Server> eligible = getEligibleServers(servers, loadBalancerKey);
if (eligible.size() == 0) {
return Optional.absent();
}
return Optional.of(eligible.get(random.nextInt(eligible.size())));
}

/**
* Choose a server in a round robin fashion after the predicate filters a given list of servers and load balancer key.
*/
public Optional<Server> chooseRoundRobinAfterFiltering(List<Server> servers, Object loadBalancerKey) {
List<Server> eligible = getEligibleServers(servers, loadBalancerKey);
if (eligible.size() == 0) {
return Optional.absent();
}
return Optional.of(eligible.get(nextIndex.getAndIncrement() % eligible.size()));
}
public static AbstractServerPredicate ofKeyPredicate(final Predicate<PredicateKey> p) {
return new AbstractServerPredicate() {
@Override
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP")
public boolean apply(PredicateKey input) {
return p.apply(input);
}
};
}

/**
* Create an instance from a predicate.
*/
public static AbstractServerPredicate ofServerPredicate(final Predicate<Server> p) {
return new AbstractServerPredicate() {
@Override
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP")
public boolean apply(PredicateKey input) {
return p.apply(input.getServer());
}
};
}
区域感知轮询策略(ZoneAvoidanceRule)

该策略是 过滤线性轮询策略(PredicateBasedRule) 的实现类,主要以 区域和可用性过滤服务器的规则 为基础,也通过组合过滤条件和该策略本身的过滤条件相辅相成,而 AbstractServerPredicate 则为次过滤条件。

他的过滤条件也非常的简单,首先,需要通过 randomChooseZone 来随机选择服务器,之后 selectedZone 来选择区域并返回,之后 totalServerCount 统计服务器总数,以及获取实例数(getInstanceCount)。

在配合 indexsum,其中 index 及服务器总数,每发现一个 +1。而 sum 则是实例数的总和,当服务总数小于总和时,那么将会选择该区域并返回。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
static String randomChooseZone(Map<String, ZoneSnapshot> snapshot,
Set<String> chooseFrom) {
if (chooseFrom == null || chooseFrom.size() == 0) {
return null;
}
String selectedZone = chooseFrom.iterator().next();
if (chooseFrom.size() == 1) {
return selectedZone;
}
int totalServerCount = 0;
for (String zone : chooseFrom) {
totalServerCount += snapshot.get(zone).getInstanceCount();
}
int index = random.nextInt(totalServerCount) + 1;
int sum = 0;
for (String zone : chooseFrom) {
sum += snapshot.get(zone).getInstanceCount();
if (index <= sum) {
selectedZone = zone;
break;
}
}
return selectedZone;
}

当选择完后,就是通过 availableZones 方法获取可用区域,如果可用区数量等于1,则返回可用区(availableZones),当实例数为0时,开始移出可用区。

1
2
3
if (availableZones.size() == 1) {
return availableZones;
}

移出可用区的方式有很多,其中就是 获取心跳次数除于实例总数大于等于服务无回应百分比或每台服务器负载,小于0的,也会被移出可用区。

1
2
3
4
5
6
if (((double) zoneSnapshot.getCircuitTrippedCount())
/ instanceCount >= triggeringBlackoutPercentage
|| loadPerServer < 0) {
availableZones.remove(zone);
limitedZoneAvailability = true;
}

或者说每台服务器负载(loadPerServer)减去每台服务器最大负载(maxLoadPerServer)小于 0.000001d,则会被添加至最差区域(worstZones)。

1
2
3
4
5
6
7
8
9
if (Math.abs(loadPerServer - maxLoadPerServer) < 0.000001d) {
// they are the same considering double calculation
// round error
worstZones.add(zone);
} else if (loadPerServer > maxLoadPerServer) {
maxLoadPerServer = loadPerServer;
worstZones.clear();
worstZones.add(zone);
}

也可以通过每台服务器负载(loadPerServer)大于服务器最大负载(maxLoadPerServer)的方式让其加入到最差区域中。

至于随机选择区(randomChooseZon),如果不为0,也会被移出可用区(因为避免区域被你霸占了),否则返回可用区。

1
2
3
4
5
6
7
    String zoneToAvoid = randomChooseZone(snapshot, worstZones);
if (zoneToAvoid != null) {
availableZones.remove(zoneToAvoid);
}
return availableZones;

}

最后,当一系列的区域过滤完成后,通过线性轮询的方式从过滤结果中选出一个服务实例。

可用性过滤策略(AvailabilityFilteringRule)

该策略根据宕机或超过请求时限的活动连接来分配权重,他是 PredicateBasedRule 的延伸,其主要还是通过 AbstractServerPredicate 来实现具体的功能。

负载均衡策略自定义

自定义负载均衡策略需要先运行 consul 除服务消费者之外的集群,之后新建项目添加依赖:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-netflix-ribbon</artifactId>
<version>2.1.6.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-consul-discovery</artifactId>
<version>3.0.3</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.netflix.ribbon/ribbon-loadbalancer -->
<dependency>
<groupId>com.netflix.ribbon</groupId>
<artifactId>ribbon-loadbalancer</artifactId>
<version>2.7.18</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</dependency>
</dependencies>

application.properties

1
2
3
4
5
spring.application.name=Ribbon Rule
server.port=9003
spring.cloud.consul.host=localhost
spring.cloud.consul.discovery.service-name=service-provider # 服务提供者名字
spring.cloud.consul.discovery.register=false # 是否注册服务

Application

在启动类中通过 @LoadBalancd 以及 @Bean 来实例化 restTemplate

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.loadbalancer.LoadBalanced;

import org.springframework.context.annotation.Bean;
import org.springframework.web.client.RestTemplate;

/**
* 启动类,通过 @LoadBalanced 来开启客户端的负载均衡
* @author kunlun
* @date 2021/06/22
*/
@SpringBootApplication
public class DemoApplication {

public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}


/**
* 实例化 RestTemplate
* @return RestTemplate
*/
@LoadBalanced
@Bean
RestTemplate restTemplate() {
return new RestTemplate();
}

}

RibbonConfig

在配置类中启用 Configuration 注解,并使用 RibbonClient 来配置服务提供者名称,最后通过 Bean 注解实例化负载均衡策略(RandomRule)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import com.netflix.loadbalancer.IRule;
import com.netflix.loadbalancer.RandomRule;
import org.springframework.cloud.netflix.ribbon.RibbonClient;
import org.springframework.cloud.netflix.ribbon.RibbonClientConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
* 配置类,通过 @RibbonClient 注解来配置服务名以及实例化
* @author kunlun
* @date 2021/06/29
*/
@Configuration
@RibbonClient(name = "service-provider", configuration = RibbonClientConfiguration.class)
public class RibbonConfig {

/**
* 实例化 Ribbon 的随机策略(RandomRule)
* @return RandomRule
*/
@Bean
public IRule irule() {
return new RandomRule();
}
}

TestController

测试类主要用于通过负载均衡器来使用负载均衡策略实现选择服务实例的效果:

运行后浏览器打开 http://localhost:9003/hey 每次刷新会得到不同的服务提供者实例返回的信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package com.example.demo.controller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.loadbalancer.LoadBalancerClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import java.text.SimpleDateFormat;
import java.util.Date;

@RestController
public class TestController {

@Autowired
private LoadBalancerClient loadBalancerClient;

Date date = new Date();

SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyy-MM-dd hh:mm:ss");

@GetMapping("/hey")
public String hey() {
ServiceInstance serviceInstance = loadBalancerClient.choose("service-provider");
String callService = "Host: " + serviceInstance.getHost() + " Port: " + serviceInstance.getPort() + " Date:" + simpleDateFormat.format(date);
return callService;
}

}

本文使用《江雪分析公开知识存储库知识共享许可证》进行发布

Consul Hystrix Dashboard


Hystrix Dashboard 即 “Hystrix 仪表盘”,也是除了容错之外还提供的实时监控功能,他会实时的累加记录所有关于 HystrixCommand 执行的信息,其中包含了每秒执行了多少请求,以及成功或失败等信息。

Hystrix Dashboard 是一款针对 Hystrix 进行实时监控的工具,可以实现出实时监控数据,并直观的显示 Hystrix Command 的请求响应时间、请求成功率等

因为 Hystrix Dashboard 仅仅实现了数据的监控,因此 我们需要在 Hystrix Fallback 的基础上进行扩展,其中启动类需要修改并添加依赖:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-consul-discovery</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.cloud/spring-cloud-starter-netflix-hystrix -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-hystrix</artifactId>
<version>2.2.2.RELEASE</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.cloud/spring-cloud-starter-netflix-hystrix-dashboard -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-hystrix-dashboard</artifactId>
<version>2.2.7.RELEASE</version>
</dependency>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package com.example.demo;

import com.netflix.hystrix.contrib.metrics.eventstream.HystrixMetricsStreamServlet;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.web.servlet.ServletRegistrationBean;
import org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.netflix.hystrix.dashboard.EnableHystrixDashboard;
import org.springframework.cloud.openfeign.EnableFeignClients;
import org.springframework.context.annotation.Bean;

/**
* 启用服务发现以及 feign、Hystrix Dashboard 和服务熔断等注解支持,并配置 Servlet
*
* @author kunlun
* @date 2021/7/6
*/
@SpringBootApplication
@EnableDiscoveryClient
@EnableFeignClients
@EnableHystrixDashboard
@EnableCircuitBreaker
public class DemoApplication {

public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}

@Bean
public ServletRegistrationBean servletRegistrantionBean() {
HystrixMetricsStreamServlet hystrixMetricsStreamServlet = new HystrixMetricsStreamServlet();
ServletRegistrationBean registrationBean = new ServletRegistrationBean(hystrixMetricsStreamServlet);
registrationBean.setLoadOnStartup(1);
registrationBean.addUrlMappings("/actuator/hystrix.stream");
registrationBean.setName("HystrixMetricsStreamServlet");
return registrationBean;
}
}

application.properties

1
2
3
4
5
6
7
spring.application.name=HystrixDashboard
server.port=9502
spring.cloud.consul.host=localhost
spring.cloud.consul.port=8500
spring.cloud.consul.discovery.service-name=service-provider
feign.hystrix.enabled=true
hystrix.dashboard.proxy-stream-allow-list=* # 设置仪表板代理流允许列表为所有

当一切完成之后访问 http://localhost:9502/hystrix 并在输入框输入 http://localhost:9502/actuator/hystrix.stream 即可。

Hystrix Turbine 与 Hystrix Dashboard 的区别就是一个只能看到单个应用程序内的服务信息,而另一个则是多个实例集群的状态形式展现,因此 Hystrix Turbine 由此而生。

强烈建议使用 Eureka

本文使用《江雪分析公开知识存储库知识共享许可证》进行发布

Spring cloud Hystrix 熔断实现


通常回退方法的实现有两种,一种是通过 feign 来进行实现,另一种解决方案则是根据 @HystrixCommand 注解进行实现,在此之前我们需要添加依赖:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-consul-discovery</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.cloud/spring-cloud-starter-netflix-hystrix -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-hystrix</artifactId>
<version>2.2.2.RELEASE</version>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>

之后在启动类中添加注解:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package com.example.demo;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.openfeign.EnableFeignClients;

/**
* 启动类,分别开启 consul、feign 以及服务熔断
*
* @author kunlun
* @date 2021/7/5
*/
@SpringBootApplication
@EnableDiscoveryClient
@EnableFeignClients
@EnableCircuitBreaker
public class DemoApplication {

public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}

}

尽管 @EnableCircuitBreaker3.0.1 开始 Hystrix 就不在 Spring cloud Netfilx 中了,但是他是唯一一个实现断路器的注解

https://martinfowler.com/bliki/CircuitBreaker.html

application.properties

1
2
3
4
5
6
7
spring.application.name=Hystrix
server.port=9900
spring.cloud.consul.host=localhost
spring.cloud.consul.port=8500
spring.cloud.consul.discovery.service-name=service-provider
spring.cloud.consul.discovery.register=false
feign.hystrix.enabled=true # 开启 Hystrix

service

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package com.example.demo.service;

import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;

/**
* 使 Feign 支持 Hystrix,即添加 fallback 处理类,在服务熔断时返回 fallback 类中内容
*
* @author kunlun
* @date 2021/7/08
*/
@FeignClient("service-provider")
public interface MyFeignFallbackClient {

/**
* 添加指定的 fallback 类,在服务熔断时返回 fallback 类内容
* @return
*/
@GetMapping("/hey")
String hey();
}

controller

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package com.example.demo.controller;

import com.example.demo.service.MyFeignFallbackClient;
import com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
* 调用 FeiginFallback 客户端
*
* @author kunlun
* @date 2021/7/5
*/
@RestController
public class HeyController {

@Autowired
private MyFeignFallbackClient myFeignFallbackClient;

@RequestMapping("/hey")
@HystrixCommand(fallbackMethod = "defaultCities")
public String index() {
String fallback = myFeignFallbackClient.hey();
return fallback;
}

public String defaultCities() {
return "No server!!!";
}

}

本文使用《江雪分析公开知识存储库知识共享许可证》进行发布

Spring cloud 雪崩效应与 Hystrix 容错方案

Hystrix 是由 Netflix 开源的一个延迟和容错库,旨在隔离对远程系统、服务和第三方库的访问点,他主要通过:“隔离、降级、融断、缓存”,等方式来避免或解决雪崩效应。处理和预防这类问题的项目也很多,通常主流的是 Sentinel、Hystrix、Resillience4j 三类解决方案。

Sentinel

阿里巴巴于 2012年 发布的面向分布式服务架构的轻量级流量控制组件。

Resilience4j

他是一个受 Netfilx Hystrix 启发的轻量级容错库,专门为函数式编程而设计

雪崩效应

雪崩效应也被称之为 “服务雪崩”,而雪崩效应的造成就是 级联 的崩溃,假设服务A调用服务B,服务B调用服务C,服务C调用服务D,这其中任何一个节点无法使用,就会造成下面的服务无法使用,这就会产生级联故障。

如果这种请求很多,那么将会导致不可用的服务越来越多,之后占用的计算机资源也越多,而这些请求则会消耗系统资源,导致其他请求也不可用,这种现象也被称之为:“服务雪崩”。

也就是说当服务提供者导致了服务消费者不可使用,并将问题逐渐放大至微服务系统,进而造成系统崩溃。

造成雪崩原因

雪崩效应的造成原因可以是流量激增、硬件故障、bug、缓存、资源耗尽、线程同步等待、配套资源不可用等:

流量激增

在目前,造成流量激增的方式可以恶意攻击,或者说是爬虫造成的都有可能,这种主要体现在了京东 6·18 活动中。

硬件故障

假设一个集群中,有两个服务提供者,一个服务提供者的硬件损害。那么整个集群就成为了单点应用,导致服务的压力加大,从而出现了服务延迟,由于服务延迟的不断增加,最终导致了雪崩。

bug

通常程序中出现 bug 都是正常的事情,在 spring cloud 中,假设有一个不应该被类使用的接口被调用,则会造成服务之间的互相调用导致的循环逻辑问题,则会可能导致服务雪崩。

缓存问题

缓存问题主要可以分为缓存穿透、缓存击穿、缓存雪崩等三个问题:

缓存穿透

缓存穿透指的是用户不断请求数据库中没有的数据,这将会导致数据库的压力过大。至于这种问题的防范问题,可以通过下述方法解决:

  1. 在路由中增加权限校验
  2. 设置最大(max)与最小(min)的拦截请求(大于等于或小于等于)
缓存击穿

缓存击穿是指 缓存没有数据,但数据库中有数据(通常是缓存时间到期了),当用户在缓存中没有得到数据,就从数据库中获得数据,从而让数据库压力增大,就会造成缓存击穿。

至于预防缓存击穿,那么就就是将 热点数据 设置为永不过期,来解决问题。

缓存雪崩

缓存雪崩说直白一点,就是缓存大量过期,而此时很多用户都是查询缓存中的数据,从而将流量转向数据库,进一步导致数据库宕机。预防这种问题需要通过将缓存数据的过期时间为 随机,以此来防止数据缓存同一时间大量过期。 如果将缓存击穿的问题解决了,那么缓存雪崩的问题也可以适当的预防。

资源耗尽

资源耗尽主要由两种原因造成的原因可以是:“服务提供者不可用,导致服务消费者等待,进而造成资源消耗”,也可是用户的大量流量请求,以及重试流量加大导致的。

线程同步等待

假设系统采用的是服务调用模式,核心服务和非核心服务共用一个线程池以及消息队列,假设一个核心的业务线程调用了一个非核心进程,那么这个非核心进程一旦出现问题,则导致核心线程出现问题。如果核心线程断了,那么将会导致后续的线程也一一崩溃,最终导致雪崩。

配套资源不可用

配套资源不可用可能是数据中心不可用或者电信基础网络服务出现问题,欣慰的是这类事故出现的概率极低。

Hystrix 的解决方式

熔断机制

至于 Hystrix 熔断机制,他主要通过融断器来执行融断的工作,熔断器在现实生活中的运用主要是 “保险丝”。

保险丝(fuse)的在电器的主要作用就是 当电器的电流异常升高到一定热度的时候,那么保险丝自身熔断来切断电流,以此来保护电路的安全运行

快速失败及熔断机制


Hystrix 的熔断器可以实现快速失败,所谓快速失败就是当一个问题达到了某个规则的阀值,则该请求以后调用任何接口都会失败,最后导致不会访问远程服务器,以此来防止应用程序和资源的消耗。

熔断器能够诊断错误是否经过修正,如果已经修正那么熔断器则会关闭,让应用程序在此调用操作。熔断器能够记录最近调用所发生的错误次数,假设熔断器小于阀值,则可以关闭,否则继续开启。

熔断在正常状态下,一般处于关闭状态,如果调用的服务出错值达到莫个阀值,则会让其快速失效,以此来避免大量的无效请求影响系统。

一段时间过后熔断机制将会进入半尝试状态,允许少量请求进行尝试,如果成功则关闭熔断状态,否则将继续持续。

隔离机制

隔离机制主要分为线程池隔离以及信号量隔离模式,在病毒传播中为了防止病毒扩散,最好的方式就是隔离病毒携带者,从而来保护正常的个体。这种方式在分布式系统中也采用隔离的方式进行容错处理。

线程池隔离模式


在 Hystrix 的线程池隔离模式下,会为每一个 依赖建立一个线程池,以此来存储当前的依赖请求,线程池对请求进行处理。 线程池的主要作用就是隔离线程的依赖,以此来限制线程的并发访问和阻塞扩张。

当流量达到峰值的时候,如果不能及时处理的请求将会被存储到线程池后进行处理。运营环境被隔离时,会根据依赖划分多个线程池,以此来进行资源隔离,就算调用的服务代码存在 bug,也不会对系统的其他服务造成影响。

信号量隔离模式


信号量隔离模式主要用于记录当前请求的数量,他内在存在了一个类似 “规则” 的,当信号量小于等于规则内最大的数量时,将会丢弃请求,否则将会执行请求,且对当前的信号量加一。

服务降级

服务降级的主要机制就是 如果一个资源快不够了,则需要将某些服务先关掉,之后等到资源足够时在进行开启,和熔断的目的相差无几,以此来保证上游服务的稳定性。根据业务的不同,降级也分为两种模式,分别为:

Fallback

Fallback 一词也被称之为 “倒退” 的意思,及如果服务失败,那么则会通过 fallback 的方式进行降级。

服务级联模式

服务级联模式则是如果服务失败,则需要调用备用服务,服务级联模式会尽可能的返回数据。

这样的做法如果考虑不充分会造成级联的崩溃,假设在缓存失败后将全部流量导到数据库请求中,那么数据库可能会直接崩溃,因此级联模式也考验开发者的能力

简单来讲服务降级的两种方式分别是分级,将故障的服务丢弃,而服务级联模式则是将请求转移在另一个集群上。

缓存机制

缓存机制就是将请求的所有结果进行缓存,如果有相同的请求发送过来,则直接从缓存中取出结果,以此来减少请求的开销。

本文使用《江雪分析公开知识存储库知识共享许可证》进行发布